bf684eea956dbbbf7a35ef3deca8b4462b139136,src/org/apache/cassandra/db/ColumnFamilyStore.java,ColumnFamilyStore,doFileCompaction,#List#number#,1230

Before Change


	                    {
	                    	try
	                    	{
	                    		filestruct = getNextKey(filestruct);
	                    		if(filestruct == null)
	                    		{
	                    			continue;
	                    		}

After Change


	                    fs = pq.poll();                        
	                }
	                if (fs != null
	                        && (lastkey == null || lastkey.compareTo(fs.getKey()) == 0))
	                {
	                    // The keys are the same so we need to add this to the
	                    // ldfs list
	                    lastkey = fs.getKey();
	                    lfs.add(fs);
	                }
	                else
	                {
	                    Collections.sort(lfs, new FileStructComparator());
	                    ColumnFamily columnFamily = null;
	                    bufOut.reset();
	                    if(lfs.size() > 1)
	                    {
		                    for (FileStruct filestruct : lfs)
		                    {
		                    	try
		                    	{
	                                /* read the length although we don't need it */
	                                filestruct.getBufIn().readInt();
	                                // Skip the Index
                                    IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
	                                // We want to add only 2 and resolve them right there in order to save on memory footprint
	                                if(columnFamilies.size() > 1)
	                                {
	    		                        merge(columnFamilies);
	                                }
			                        // deserialize into column families                                    
			                        columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
		                    	}
		                    	catch ( Exception ex)
		                    	{
                                    logger_.warn("error in filecompaction", ex);
                                }
		                    }
		                    // Now after merging all crap append to the sstable
		                    columnFamily = resolveAndRemoveDeleted(columnFamilies);
		                    columnFamilies.clear();
		                    if( columnFamily != null )
		                    {
			                	/* serialize the cf with column indexes */
			                    ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
		                    }
	                    }
	                    else
	                    {
		                    FileStruct filestruct = lfs.get(0);
	                    	try
	                    	{
		                        /* read the length although we don't need it */
		                        int size = filestruct.getBufIn().readInt();
		                        bufOut.write(filestruct.getBufIn(), size);
	                    	}
	                    	catch ( Exception ex)
	                    	{
	                    		ex.printStackTrace();
	                            filestruct.close();
	                            continue;
	                    	}
	                    }
	                    	         
	                    if ( ssTable == null )
	                    {
	                    	ssTable = new SSTable(compactionFileLocation, mergedFileName);	                    	
	                    }
                        ssTable.append(lastkey, bufOut);

                        /* Fill the bloom filter with the key */
	                    doFill(compactedBloomFilter, lastkey);                        
	                    totalkeysWritten++;
	                    for (FileStruct filestruct : lfs)
	                    {
	                    	try
	                    	{
                                filestruct.getNextKey();
	                    		if (filestruct.isExhausted())
	                    		{
	                    			continue;